feat(jobs): port 5 celery beat tasks from discovery-provider#834
Open
raymondjacobson wants to merge 1 commit into
Open
feat(jobs): port 5 celery beat tasks from discovery-provider#834raymondjacobson wants to merge 1 commit into
raymondjacobson wants to merge 1 commit into
Conversation
Adds five periodic jobs that mirror tasks from apps/packages/discovery-provider's celery beat schedule, plugging gaps left by the discovery → go-openaudio cutover. Each job follows the existing api/jobs/ pattern (Run + ScheduleEvery + mutex-guarded isRunning) and is wired into CoreIndexer.Start. Jobs: - index_hourly_play_counts: rolls up plays into hourly_play_counts every 30s; powers GET /v1/metrics/plays. - prune_plays: deletes plays older than ~400 days, 50k/run cap. - index_user_listening_history: maintains per-user listening_history JSONB blob from plays, capped at 1000 tracks/user. - index_trending (score computation only): refreshes aggregate_interval_plays and trending_params MVs, then runs the trending-score SQL templates from apps' pnagD + AnlGe strategies verbatim into track_trending_scores + playlist_trending_scores. Notifications and tastemaker challenge dispatch are intentionally deferred — they depend on the challenges system which is a separate effort. - update_delist_statuses: polls notifier.audius.co (hardcoded — apps reads from config but we don't need multi-notifier support today) for delist updates, applies them to users.is_available / tracks.is_available + is_delete, advances per-entity cursors in delist_status_cursor. Uses a new signedHTTPGet helper (delegate_auth.go) that ports apps' basic_auth_nonce: ETH personal-sign over a keccak'd timestamp, base64'd into a "Basic <ts>:<sig_hex>" header. Tests: - Pure unit: TestBasicAuthNonce_Shape, TestMergeListeningHistory_CapsToLimit. - DB-backed (test_jobs template): TestHourlyPlayCountsJob_FullPipeline / NoPlays, TestPrunePlaysJob_DeletesOnlyOld / RespectsBatchSize, TestUserListeningHistoryJob_InsertsFirstHistory / MergesAcrossRuns, TestTrendingJob_PopulatesScores (idempotency check included). All `go vet` + `go build` clean; `go test ./jobs/` passes against api-db-1 template DBs.
12 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Ports 5 periodic celery tasks from `apps/packages/discovery-provider` into the existing `api/jobs/` framework. Each job is its own file, follows the `Run` + `ScheduleEvery` + mutex-guarded `isRunning` pattern (matching `record_balance_history.go` / `reclaim_rent.go`), and is scheduled inside `CoreIndexer.Start` next to the existing aggregates calculator.
These fill gaps left after the discovery → go-openaudio cutover. Tables they read/write already exist in api/'s schema (verified against `sql/01_schema.sql`).
Jobs
Plus `delegate_auth.go`: `signedHTTPGet` helper porting apps' `basic_auth_nonce` (ETH personal-sign over keccak'd timestamp, base64'd `Basic :<sig_hex>` header).
Scoping decisions
Trending: score-only. apps' `index_trending.py` also writes trending-mover notifications and calls `index_tastemaker` which dispatches challenge events. Both depend on the challenges/notification systems that aren't ported yet. The scoring half is what makes `/v1/tracks/trending`, `/v1/playlists/trending`, `/v1/tracks/trending/underground` non-empty — that's the user-facing parity.
Trusted notifier hardcoded to `https://notifier.audius.co/\`. apps reads from config; we don't need multi-notifier support today. Easy promotion to a config knob later if needed.
No new config required. `config.Cfg.DelegatePrivateKey` already exists.
Wiring
`startParityJobs` instantiates each job with `ci.Config` and `ci.pool` and calls `ScheduleEvery`. Each job self-manages its goroutine and exits on `ctx.Done()` — they don't join the errgroup since they're best-effort background work.
Test plan
Out of scope (called out in the parity discussion)
🤖 Generated with Claude Code